RabbitMq的ack用法 | 您所在的位置:网站首页 › matlab clear用法 › RabbitMq的ack用法 |
仔細檢視一下 Consumer 的回撥方法: public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ...... consumerChannel1.basicAck(envelope.getDeliveryTag(), false); }當我們需要確認一條訊息已經被消費時,我們呼叫的 basicAck 方法的第一個引數是 Delivery Tag。 Delivery Tag 用來標識通道中投遞的訊息。RabbitMQ 推送訊息給 Consumer 時,會附帶一個 Delivery Tag,以便 Consumer 可以在訊息確認時告訴 RabbitMQ 到底是哪條訊息被確認了。 RabbitMQ 保證在每個通道中,每條訊息的 Delivery Tag 從 1 開始遞增。 執行下面的例子可以直觀的看到這點: gordon.study.rabbitmq.ack.TestAckBasic.java public class TestAckBasic { private static final String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); final Channel consumerChannel1 = connection.createChannel(); consumerChannel1.queueDeclare(QUEUE_NAME, false, false, false, null); consumerChannel1.basicQos(3); Consumer consumer1 = new DefaultConsumer(consumerChannel1) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.printf("in consumer A (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } consumerChannel1.basicAck(envelope.getDeliveryTag(), false); } }; consumerChannel1.basicConsume(QUEUE_NAME, false, consumer1); final Channel consumerChannel2 = connection.createChannel(); consumerChannel2.basicQos(3); Consumer consumer2 = new DefaultConsumer(consumerChannel2) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.printf("in consumer B (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } consumerChannel2.basicAck(envelope.getDeliveryTag(), false); } }; consumerChannel2.basicConsume(QUEUE_NAME, false, consumer2); Channel senderChannel = connection.createChannel(); for (int i = 0; i < 10;) { String message = "NO. " + ++i; TimeUnit.MILLISECONDS.sleep(100); senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); } senderChannel.close(); } }result: in consumer A (delivery tag is 1): NO. 1 in consumer B (delivery tag is 1): NO. 2 in consumer A (delivery tag is 2): NO. 3 in consumer B (delivery tag is 2): NO. 4 in consumer A (delivery tag is 3): NO. 5 in consumer B (delivery tag is 3): NO. 6 in consumer A (delivery tag is 4): NO. 7 in consumer B (delivery tag is 4): NO. 8 in consumer A (delivery tag is 5): NO. 9 in consumer B (delivery tag is 5): NO. 10可見,兩個通道的 delivery tag 分別從 1 遞增到 5。(如果修改程式碼,將兩個 Consumer 共享同一個通道,則 delivery tag 是從 1 遞增到 10,參考 gordon.study.rabbitmq.ack.TestAckInOneChannel.java) basicAck 方法的第二個引數 multiple 取值為 false 時,表示通知 RabbitMQ 當前訊息被確認;如果為 true,則額外將比第一個引數指定的 delivery tag 小的訊息一併確認。(批量確認針對的是整個通道,參考gordon.study.rabbitmq.ack.TestBatchAckInOneChannel.java。) 對同一訊息的重複確認,或者對不存在的訊息的確認,會產生 IO 異常,導致通道關閉。 B. 忘了確認會怎樣如果我們註釋掉22行,讓 consumerChannel1 不再確認訊息,世界會怎樣? 只要程式還在執行,這3條訊息就一直是 Unacked 狀態,無法被 RabbitMQ 重新投遞。更厲害的是,RabbitMQ 訊息消費並沒有超時機制,也就是說,程式不重啟,訊息就永遠是 Unacked 狀態。處理運維事件時不要忘了這些 Unacked 狀態的訊息。 當程式關閉時(實際只要 Consumer 關閉就行),這3條訊息會恢復為 Ready 狀態。 C. 取消確認當消費訊息出現異常時,我們需要取消確認,這時我們可以使用 Channel 的 basicReject 方法。 void basicReject(long deliveryTag, boolean requeue) throws IOException;第一個引數指定 delivery tag,第二個引數說明如何處理這個失敗訊息。requeue 值為 true 表示該訊息重新放回佇列頭,值為 false 表示放棄這條訊息。 一般來說,如果是系統無法處理的異常,我們一般是將 requeue 設為 false,例如訊息格式錯誤,再處理多少次也是異常。呼叫第三方介面超時這類異常 requeue 應該設為 true。 從 basicReject 方法引數可見,取消確認不支援批量操作(類似於 basicAck 的 multiple 引數)。所以,RabbitMQ 增加了 basicNack 方法以提供批量取消能力。參考 https://www.rabbitmq.com/nack.html PS:Reject 的訊息重新推送來時,delivery tag 就是新的值了。 |
今日新闻 |
推荐新闻 |
专题文章 |
CopyRight 2018-2019 实验室设备网 版权所有 |